Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Add null check to writers to prevent resurrecting null values #12049

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Jan 22, 2025

Flink's BinaryRowData uses a magic byte to indicate null values in the backing byte arrays. Flink's internal RowData#createFieldGetter method which Iceberg uses, only adds a null check whenever a type is nullable. We map Iceberg's optional attribute to nullable, but Iceberg's required attribute to non-nullable. The latter creates an issue when the user, by mistake, nulls a field. The resulting RowData field will then be interpreted as actual data because the null field is not checked. This yields random values which should have been null and produced an error in the writer.

The solution is to always check if a field is nullable before attempting to read data from it.

// This will produce incorrect writes instead of failing with a NullPointerException.
if (struct.isNullAt(index)) {
return null;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual fix is here.

@mxm mxm force-pushed the null-value-check branch from 5ecff96 to f763c54 Compare January 22, 2025 18:01
@pvary
Copy link
Contributor

pvary commented Jan 23, 2025

@mxm: Please remove the 1.18, 1.19 changes from the PR. It is much easier to review this way, and apply changes required by the reviewer. When the PR has been merged, we can backport the changes to the other Flink versions.

QQ: What happens when we have a type discrepancy between the Iceberg type and the RawData type? Could we have issues with other conversions? Do we have a way to prevent those?

@mxm mxm force-pushed the null-value-check branch 3 times, most recently from b8700c9 to 734b7bb Compare January 23, 2025 11:18
@mxm
Copy link
Contributor Author

mxm commented Jan 23, 2025

@mxm: Please remove the 1.18, 1.19 changes from the PR. It is much easier to review this way, and apply changes required by the reviewer. When the PR has been merged, we can backport the changes to the other Flink versions.

Makes sense! Done.

QQ: What happens when we have a type discrepancy between the Iceberg type and the RawData type? Could we have issues with other conversions? Do we have a way to prevent those?

Type discrepancies between Iceberg and Flink types will error in Flink's TypeSerializer for a given field. For example, an int field will use IntSerializer which only accepts Integer. This will raise an NoSuchMethodError during serialization. As long as we use the same serializer also for deserialization, we should be fine. That is the case.

@mxm
Copy link
Contributor Author

mxm commented Jan 23, 2025

@pvary I had to re-add the 1.18 and 1.19 changes, but they are in a separate commit. The reason is that I modified a test base class which affects also 1.18 and 1.19. We can't build otherwise.

@mxm mxm force-pushed the null-value-check branch 2 times, most recently from dba054f to 2f031e3 Compare January 23, 2025 13:17
Flink's BinaryRowData uses a magic byte to indicate null values in the backing
byte arrays. Flink's internal RowData#createFieldGetter method which Iceberg
uses, only adds a null check whenever a type is nullable. We map Iceberg's
optional attribute to nullable, but Iceberg's required attribute to
non-nullable. The latter creates an issue when the user, by mistake, nulls a
field. The resulting RowData field will then be interpreted as actual data
because the null field is not checked. This yields random values which should
have been null and produced an error in the writer.

The solution is to always check if a field is nullable before attempting to read
data from it.
@mxm mxm force-pushed the null-value-check branch from 2f031e3 to f4893cc Compare January 23, 2025 13:52
@mxm mxm force-pushed the null-value-check branch from f4893cc to 59dacfb Compare January 23, 2025 13:58
@mxm
Copy link
Contributor Author

mxm commented Jan 23, 2025

Tests are green.

@mxm
Copy link
Contributor Author

mxm commented Jan 23, 2025

CC @stevenzwu

@@ -50,6 +50,7 @@
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.junit.Ignore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a junit 4 or junit 5 test?
Should this be Ignore, or Disabled?

@@ -39,14 +39,20 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Ignore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore or Disabled?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants